home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / access / nobtree / nobtinsert.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  32.2 KB  |  1,055 lines

  1. /*
  2.  *  nobtinsert.c -- Item insertion in Lehman and Yao btrees for Postgres.
  3.  */
  4.  
  5. #include "tmp/c.h"
  6. #ifdef NOBTREE
  7. #include "tmp/postgres.h"
  8.  
  9. #include "storage/bufmgr.h"
  10. #include "storage/bufpage.h"
  11. #include "storage/page.h"
  12.  
  13. #include "utils/log.h"
  14. #include "utils/rel.h"
  15. #include "utils/excid.h"
  16.  
  17. #include "access/heapam.h"
  18. #include "access/genam.h"
  19. #include "access/ftup.h"
  20. #include "access/nobtree.h"
  21.  
  22. RcsId("$Header: /private/postgres/src/access/nobtree/RCS/nobtinsert.c,v 1.6 1991/06/26 19:12:14 mao Exp $");
  23.  
  24. extern bool    NOBT_Building;
  25. extern uint32    CurrentLinkToken;
  26. extern int    NOBT_NSplits;
  27.  
  28. /*
  29.  *  _nobt_doinsert() -- Handle insertion of a single btitem in the tree.
  30.  *
  31.  *    This routine is called by the public interface routines, nobtbuild
  32.  *    and nobtinsert.  By here, btitem is filled in, and has a unique
  33.  *    (xid, seqno) pair.
  34.  */
  35.  
  36. InsertIndexResult
  37. _nobt_doinsert(rel, btitem)
  38.     Relation rel;
  39.     NOBTLItem btitem;
  40. {
  41.     ScanKey itup_scankey;
  42.     IndexTuple itup;
  43.     NOBTStack stack;
  44.     Buffer buf;
  45.     BlockNumber blkno;
  46.     OffsetIndex itup_off;
  47.     int natts;
  48.     InsertIndexResult res;
  49.  
  50.     itup = &(btitem->nobtli_itup);
  51.  
  52.     /* we need a scan key to do our search, so build one */
  53.     itup_scankey = _nobt_mkscankey(rel, itup);
  54.     natts = rel->rd_rel->relnatts;
  55.  
  56.     /* find the page containing this key */
  57.     stack = _nobt_search(rel, natts, itup_scankey, &buf);
  58.     blkno = BufferGetBlockNumber(buf);
  59.  
  60.     /* trade in our read lock for a write lock */
  61.     _nobt_relbuf(rel, buf, NOBT_READ);
  62.     buf = _nobt_getbuf(rel, blkno, NOBT_WRITE);
  63.  
  64.     /*
  65.      *  If the page was split between the time that we surrendered our
  66.      *  read lock and acquired our write lock, then this page may no
  67.      *  longer be the right place for the key we want to insert.  In this
  68.      *  case, we need to move right in the tree.  See Lehman and Yao for
  69.      *  an excruciatingly precise description.
  70.      */
  71.  
  72.     buf = _nobt_moveright(rel, buf, natts, itup_scankey, NOBT_WRITE, stack);
  73.  
  74.     /* do the insertion */
  75.     res = _nobt_insertonpg(rel, buf, stack, natts, itup_scankey,
  76.                btitem, (NOBTIItem) NULL);
  77.  
  78.     /* be tidy */
  79.     _nobt_freestack(stack);
  80.     _nobt_freeskey(itup_scankey);
  81.  
  82.     return (res);
  83. }
  84.  
  85. /*
  86.  *  _nobt_insertonpg() -- Insert a tuple on a particular page in the index.
  87.  *
  88.  *    This recursive procedure does the following things:
  89.  *
  90.  *        +  if necessary, splits the target page.
  91.  *        +  finds the right place to insert the tuple (taking into
  92.  *           account any changes induced by a split).
  93.  *        +  inserts the tuple.
  94.  *        +  if the page was split, pops the parent stack, and finds the
  95.  *           right place to insert the new child pointer (by walking
  96.  *           right using information stored in the parent stack).
  97.  *        +  invoking itself with the appropriate tuple for the right
  98.  *           child page on the parent.
  99.  *
  100.  *    On entry, we must have the right buffer on which to do the
  101.  *    insertion, and the buffer must be pinned and locked.  On return,
  102.  *    we will have dropped both the pin and the write lock on the buffer.
  103.  *
  104.  *    The locking interactions in this code are critical.  You should
  105.  *    grok Lehman and Yao's paper before making any changes.  In addition,
  106.  *    you need to understand how we disambiguate duplicate keys in this
  107.  *    implementation, in order to be able to find our location using
  108.  *    L&Y "move right" operations.  Since we may insert duplicate user
  109.  *    keys, and since these dups may propogate up the tree, we use the
  110.  *    'afteritem' parameter to position ourselves correctly for the
  111.  *    insertion on internal pages.
  112.  */
  113.  
  114. InsertIndexResult
  115. _nobt_insertonpg(rel, buf, stack, keysz, scankey, btvoid, afteritem)
  116.     Relation rel;
  117.     Buffer buf;
  118.     NOBTStack stack;
  119.     int keysz;
  120.     ScanKey scankey;
  121.     char *btvoid;
  122.     NOBTIItem afteritem;
  123. {
  124.     NOBTIItem btiitem;
  125.     NOBTLItem btlitem;
  126.     InsertIndexResult res;
  127.     Page page;
  128.     bool isleaf;
  129.     NOBTPageOpaque opaque;
  130.     Buffer rbuf;
  131.     Buffer pbuf;
  132.     Page rpage;
  133.     ScanKey newskey;
  134.     NOBTPageOpaque rpageop;
  135.     BlockNumber rbknum, itup_blkno;
  136.     OffsetIndex itup_off;
  137.     int itemsz;
  138.     int ritemsz;
  139.     ItemId hikey;
  140.     InsertIndexResult newres;
  141.     Page ppage;
  142.     NOBTIItem lftitem, new_item;
  143.     NOBTIItem riitem;
  144.     NOBTLItem rlitem;
  145.     char *datump;
  146.     int dsize;
  147.  
  148.     page = BufferGetPage(buf, 0);
  149.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  150.     if (opaque->nobtpo_flags & NOBTP_LEAF) {
  151.     isleaf = true;
  152.     btlitem = (NOBTLItem) btvoid;
  153.     itemsz = IndexTupleDSize(btlitem->nobtli_itup)
  154.            + (sizeof(NOBTLItemData) - sizeof(IndexTupleData));
  155.     } else {
  156.     isleaf = false;
  157.     btiitem = (NOBTIItem) btvoid;
  158.     itemsz = NOBTIItemGetSize(btiitem);
  159.     }
  160.  
  161. #ifdef    REORG
  162. #define SYNC_EVERYTHING()
  163.     /* can only split every page one time between syncs */
  164.     if (opaque->nobtpo_linktok == CurrentLinkToken && !NOBT_Building) {
  165.     /* have to flush dirty buffers, force everything down to disk */
  166.     SYNC_EVERYTHING();
  167.     }
  168. #endif    /* REORG */
  169.  
  170. #ifndef    REORG
  171.     if (PageGetFreeSpace(page) < itemsz) {
  172. #else    /* REORG */
  173.     if (PageGetFreeSpace(page) < ((2 * itemsz) + sizeof(uint16))) {
  174. #endif    /* REORG */
  175.  
  176.     /* split the buffer into left and right halves */
  177.     NOBT_NSplits++;
  178. #ifdef    SHADOW
  179.     rbuf = _nobt_nsplit(rel, &buf);
  180. #endif    /* SHADOW */
  181. #ifdef    REORG
  182.     rbuf = _nobt_bsplit(rel, buf);
  183. #endif    /* REORG */
  184. #ifdef    NORMAL
  185.     rbuf = _nobt_bsplit(rel, buf);
  186. #endif    /* NORMAL */
  187.  
  188.     /* which new page (left half or right half) gets the tuple? */
  189.     if (_nobt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
  190.         itup_off = _nobt_pgaddtup(rel, buf, keysz, scankey,
  191.                        itemsz, btvoid, afteritem);
  192.         itup_blkno = BufferGetBlockNumber(buf);
  193.     } else {
  194.         itup_off = _nobt_pgaddtup(rel, rbuf, keysz, scankey,
  195.                        itemsz, btvoid, afteritem);
  196.         itup_blkno = BufferGetBlockNumber(rbuf);
  197.     }
  198.  
  199.     /*
  200.      *  By here,
  201.      *
  202.      *    +  our target page has been split;
  203.      *    +  the original tuple has been inserted;
  204.      *    +  we have write locks on both the old (left half) and new
  205.      *       (right half) buffers, after the split; and
  206.      *    +  we have the key we want to insert into the parent.
  207.      *
  208.      *  Do the parent insertion.  We need to hold onto the locks for
  209.      *  the child pages until we locate the parent, but we can release
  210.      *  them before doing the actual insertion (see Lehman and Yao for
  211.      *  the reasoning).
  212.      */
  213.  
  214.     if (stack == (NOBTStack) NULL) {
  215.  
  216.         /* create a new root node and release the split buffers */
  217.         _nobt_newroot(rel, buf, rbuf);
  218.         _nobt_relbuf(rel, buf, NOBT_WRITE);
  219.         _nobt_relbuf(rel, rbuf, NOBT_WRITE);
  220.  
  221.     } else {
  222.  
  223.         /* form a index tuple that points at the new right page */
  224.         rbknum = BufferGetBlockNumber(rbuf);
  225.         rpage = BufferGetPage(rbuf, 0);
  226.         rpageop = (NOBTPageOpaque) PageGetSpecialPointer(rpage);
  227.  
  228.         /*
  229.          *  By convention, the first entry (0) on every non-leftmost page
  230.          *  is the high key for that page.  In order to get the lowest key
  231.          *  on the new right page, we actually look at its second (1) entry.
  232.          */
  233.  
  234.         if (rpageop->nobtpo_next != P_NONE) {
  235.         if (isleaf) {
  236.             rlitem = (NOBTLItem) PageGetItem(rpage,
  237.                              PageGetItemId(rpage, 1));
  238.         } else {
  239.             riitem = (NOBTIItem) PageGetItem(rpage,
  240.                              PageGetItemId(rpage, 1));
  241.         }
  242.         } else {
  243.         if (isleaf) {
  244.             rlitem = (NOBTLItem) PageGetItem(rpage,
  245.                              PageGetItemId(rpage, 0));
  246.         } else {
  247.             riitem = (NOBTIItem) PageGetItem(rpage,
  248.                              PageGetItemId(rpage, 0));
  249.         }
  250.         }
  251.  
  252.         /* get a unique btitem for this key */
  253.         if (isleaf) {
  254.         datump = (char *) rlitem;
  255.         datump += sizeof(NOBTLItemData);
  256.         dsize = IndexTupleSize(&(rlitem->nobtli_itup)) - sizeof(IndexTupleData);
  257.         } else {
  258.         datump = (char *) riitem;
  259.         datump += sizeof(NOBTIItemData);
  260.         dsize = NOBTIItemGetSize(riitem) - sizeof(NOBTIItemData);
  261.         }
  262.         new_item = _nobt_formiitem(rbknum, datump, dsize);
  263.  
  264.         /* find the parent buffer */
  265.         pbuf = _nobt_getstackbuf(rel, stack, NOBT_WRITE);
  266.  
  267.         /* xyzzy should use standard l&y "walk right" alg here */
  268.         ppage = BufferGetPage(pbuf, 0);
  269.         lftitem = (NOBTIItem) PageGetItem(ppage,
  270.                             PageGetItemId(ppage,
  271.                           stack->nobts_offset));
  272.  
  273. #ifdef    SHADOW
  274.         lftitem->nobtii_oldchild = lftitem->nobtii_child;
  275. #endif    /* SHADOW */
  276.  
  277.         lftitem->nobtii_child = BufferGetBlockNumber(buf);
  278.  
  279.         /* don't need the children anymore */
  280.         _nobt_relbuf(rel, buf, NOBT_WRITE);
  281.         _nobt_relbuf(rel, rbuf, NOBT_WRITE);
  282.  
  283.         newskey = _nobt_imkscankey(rel, new_item);
  284.         newres = _nobt_insertonpg(rel, pbuf, stack->nobts_parent,
  285.                     keysz, newskey, new_item,
  286.                     stack->nobts_btitem);
  287.  
  288.         /* be tidy */
  289.         pfree(newres);
  290.         pfree(newskey);
  291.         pfree(new_item);
  292.     }
  293.     } else {
  294.     itup_off = _nobt_pgaddtup(rel, buf, keysz, scankey,
  295.                 itemsz, btvoid, afteritem);
  296.     itup_blkno = BufferGetBlockNumber(buf);
  297.  
  298.     _nobt_relbuf(rel, buf, NOBT_WRITE);
  299.     }
  300.  
  301.     /* by here, the new tuple is inserted */
  302.     res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
  303.     ItemPointerSet(&(res->pointerData), 0, itup_blkno, 0, itup_off);
  304.     res->lock = (RuleLock) NULL;
  305.     res->offset = (double) 0;
  306.  
  307.     return (res);
  308. }
  309.  
  310. /*
  311.  *  _nobt_bsplit() -- split a page in the btree.
  312.  *
  313.  *    On entry, buf is the page to split, and is write-locked and pinned.
  314.  *    Returns the new right sibling of buf, pinned and write-locked.  The
  315.  *    pin and lock on buf are maintained.
  316.  *
  317.  *    This routine is used only when the tree is being built; for normal
  318.  *    insertions, we must not reuse the left-hand page, and we must manage
  319.  *    the list of free pages correctly.
  320.  */
  321.  
  322. Buffer
  323. _nobt_bsplit(rel, buf)
  324.     Relation rel;
  325.     Buffer buf;
  326. {
  327.     Buffer rbuf;
  328.     Page origpage;
  329.     Page leftpage, rightpage;
  330.     BlockNumber lbkno, rbkno;
  331.     NOBTPageOpaque ropaque, lopaque, oopaque;
  332.     Buffer sbuf;
  333.     Page spage;
  334.     NOBTPageOpaque sopaque;
  335.     int itemsz;
  336.     ItemId itemid;
  337.     NOBTIItem iitem;
  338.     int nleft, nright;
  339.     OffsetIndex start;
  340.     OffsetIndex maxoff;
  341.     OffsetIndex firstright;
  342.     OffsetIndex i;
  343.     Size llimit;
  344. #ifdef    REORG
  345.     int save_left_ind;
  346.     LocationIndex save_left_lower;
  347.     LocationIndex save_left_upper;
  348.     PageHeader left_phdr;
  349. #endif    /* REORG */
  350.  
  351.     rbuf = _nobt_getbuf(rel, P_NEW, NOBT_WRITE);
  352.     origpage = BufferGetPage(buf, 0);
  353.     leftpage = PageGetTempPage(origpage, sizeof(NOBTPageOpaqueData));
  354.     rightpage = BufferGetPage(rbuf, 0);
  355.  
  356.     _nobt_pageinit(rightpage);
  357.     _nobt_pageinit(leftpage);
  358.  
  359.     /* init btree private data */
  360.     oopaque = (NOBTPageOpaque) PageGetSpecialPointer(origpage);
  361.     lopaque = (NOBTPageOpaque) PageGetSpecialPointer(leftpage);
  362.     ropaque = (NOBTPageOpaque) PageGetSpecialPointer(rightpage);
  363.  
  364.     /* if we're splitting this page, it won't be the root when we're done */
  365.     oopaque->nobtpo_flags &= ~NOBTP_ROOT;
  366.     lopaque->nobtpo_flags = ropaque->nobtpo_flags = oopaque->nobtpo_flags;
  367.     lopaque->nobtpo_prev = oopaque->nobtpo_prev;
  368.     ropaque->nobtpo_prev = BufferGetBlockNumber(buf);
  369.     lopaque->nobtpo_next = BufferGetBlockNumber(rbuf);
  370.     ropaque->nobtpo_next = oopaque->nobtpo_next;
  371. #ifndef    NORMAL
  372.     lopaque->nobtpo_linktok = CurrentLinkToken;
  373. #endif    /* ndef NORMAL */
  374.  
  375.     /*
  376.      *  If the page we're splitting is not the rightmost page at its level
  377.      *  in the tree, then the first (0) entry on the page is the high key
  378.      *  for the page.  We need to copy that to the right half.  Otherwise,
  379.      *  we should treat the line pointers beginning at zero as user data.
  380.      *
  381.      *  We leave a blank space at the start of the line table for the left
  382.      *  page.  We'll come back later and fill it in with the high key item
  383.      *  we get from the right key.
  384.      */
  385.  
  386.     nleft = 2;
  387.     nright = 1;
  388.     if ((ropaque->nobtpo_next = oopaque->nobtpo_next) != P_NONE) {
  389.     start = 1;
  390.     itemid = PageGetItemId(origpage, 0);
  391.     itemsz = ItemIdGetLength(itemid);
  392.     iitem = (NOBTIItem) PageGetItem(origpage, itemid);
  393.     PageAddItem(rightpage, iitem, itemsz, nright++, LP_USED);
  394.     } else {
  395.     start = 0;
  396.     }
  397.     maxoff = PageGetMaxOffsetIndex(origpage);
  398.     llimit = PageGetFreeSpace(leftpage) / 2;
  399.     firstright = _nobt_findsplitloc(rel, origpage, start, maxoff, llimit);
  400.  
  401.     for (i = start; i <= maxoff; i++) {
  402.     itemid = PageGetItemId(origpage, i);
  403.     itemsz = ItemIdGetLength(itemid);
  404.     iitem = (NOBTIItem) PageGetItem(origpage, itemid);
  405.  
  406.     /* decide which page to put it on */
  407.     if (i < firstright) {
  408.         PageAddItem(leftpage, iitem, itemsz, nleft++, LP_USED);
  409.     } else {
  410. #ifdef    REORG
  411.         /* for reorg, must add to left page, too */
  412.         if (i == firstright) {
  413.         left_phdr = (PageHeader) leftpage;
  414.         save_left_ind = nleft;
  415.         save_left_lower = left_phdr->pd_lower;
  416.         save_left_upper = left_phdr->pd_upper;
  417.         }
  418.         PageAddItem(leftpage, iitem, itemsz, nleft++, LP_USED);
  419. #endif    /* REORG */
  420.         PageAddItem(rightpage, iitem, itemsz, nright++, LP_USED);
  421.     }
  422.     }
  423. #ifdef    REORG
  424.     /* need to clean out flags in the linp entries, reset free space */
  425.     for (i = save_left_ind; i < nleft; i++) {
  426.     left_phdr->pd_linp[i].lp_flags &= ~LP_USED;
  427.     }
  428.     lopaque->nobtpo_oldlow = left_phdr->pd_lower;
  429.     left_phdr->pd_lower = save_left_lower;
  430.     left_phdr->pd_upper = save_left_upper;
  431. #endif    /* REORG */
  432.  
  433.     /*
  434.      *  Okay, page has been split, high key on right page is correct.  Now
  435.      *  set the high key on the left page to be the min key on the right
  436.      *  page.
  437.      */
  438.  
  439.     if (ropaque->nobtpo_next == P_NONE)
  440.     itemid = PageGetItemId(rightpage, 0);
  441.     else
  442.     itemid = PageGetItemId(rightpage, 1);
  443.     itemsz = ItemIdGetLength(itemid);
  444.     iitem = (NOBTIItem) PageGetItem(rightpage, itemid);
  445.  
  446.     /*
  447.      *  We left a hole for the high key on the left page; fill it.  The
  448.      *  modal crap is to tell the page manager to put the new item on the
  449.      *  page and not screw around with anything else.  Whoever designed
  450.      *  this interface has presumably crawled back into the dung heap they
  451.      *  came from.  No one here will admit to it.
  452.      */
  453.  
  454.     PageManagerModeSet(OverwritePageManagerMode);
  455.     PageAddItem(leftpage, iitem, itemsz, 1, LP_USED);
  456.     PageManagerModeSet(ShufflePageManagerMode);
  457.  
  458.     /*
  459.      *  By here, the original data page has been split into two new halves,
  460.      *  and these are correct.  The algorithm requires that the left page
  461.      *  never move during a split, so we copy the new left page back on top
  462.      *  of the original.  Note that this is not a waste of time, since we
  463.      *  also require (in the page management code) that the center of a
  464.      *  page always be clean, and the most efficient way to guarantee this
  465.      *  is just to compact the data by reinserting it into a new left page.
  466.      */
  467.  
  468.     PageRestoreTempPage(leftpage, origpage);
  469.  
  470.     /* write these guys out */
  471.     _nobt_wrtnorelbuf(rel, rbuf);
  472.     _nobt_wrtnorelbuf(rel, buf);
  473.  
  474.     /*
  475.      *  Finally, we need to grab the right sibling (if any) and fix the
  476.      *  prev pointer there.  We are guaranteed that this is deadlock-free
  477.      *  since no other writer will be moving holding a lock on that page
  478.      *  and trying to move left, and all readers release locks on a page
  479.      *  before trying to fetch its neighbors.
  480.      */
  481.  
  482.     if (ropaque->nobtpo_next != P_NONE) {
  483.     sbuf = _nobt_getbuf(rel, ropaque->nobtpo_next, NOBT_WRITE);
  484.     spage = BufferGetPage(sbuf, 0);
  485.     sopaque = (NOBTPageOpaque) PageGetSpecialPointer(spage);
  486.     sopaque->nobtpo_prev = BufferGetBlockNumber(rbuf);
  487.  
  488.     /* write and release the old right sibling */
  489.     _nobt_wrtbuf(rel, sbuf);
  490.     }
  491.  
  492.     /* split's done */
  493.     return (rbuf);
  494. }
  495.  
  496. #ifdef    SHADOW
  497. /*
  498.  *  _nobt_nsplit() -- split a page in the btree.
  499.  *
  500.  *    On entry, buf is the page to split, and is write-locked and pinned.
  501.  *    Returns the new right sibling of buf, pinned and write-locked.  The
  502.  *    pin and lock on buf are maintained.
  503.  *
  504.  *    This routine is used only during normal page splits.  It does not
  505.  *    reuse the left page, and manages the free list correctly.  During
  506.  *    the initial build of the index, we don't need to bother with this,
  507.  *    since building the index either commits or aborts atomically.
  508.  */
  509.  
  510. Buffer
  511. _nobt_nsplit(rel, bufP)
  512.     Relation rel;
  513.     Buffer *bufP;
  514. {
  515.     Buffer rbuf;
  516.     Buffer buf;
  517.     Page origpage;
  518.     Page leftpage, rightpage;
  519.     BlockNumber lbkno, rbkno;
  520.     NOBTPageOpaque ropaque, lopaque, oopaque;
  521.     Buffer sbuf;
  522.     Page spage;
  523.     NOBTPageOpaque sopaque;
  524.     int itemsz;
  525.     ItemId itemid;
  526.     NOBTIItem iitem;
  527.     int nleft, nright;
  528.     OffsetIndex start;
  529.     OffsetIndex maxoff;
  530.     OffsetIndex firstright;
  531.     OffsetIndex i;
  532.     Size llimit;
  533.     PageNumber repl;
  534.     Buffer newsbuf;
  535.     Page newspage;
  536.     bool nobackup;
  537.     NOBTPageOpaque newsopaque;
  538.  
  539.     buf = _nobt_getbuf(rel, P_NEW, NOBT_WRITE);
  540.     rbuf = _nobt_getbuf(rel, P_NEW, NOBT_WRITE);
  541.     origpage = BufferGetPage(*bufP, 0);
  542.     leftpage = BufferGetPage(buf, 0);
  543.     rightpage = BufferGetPage(rbuf, 0);
  544.  
  545.     _nobt_pageinit(leftpage);
  546.     _nobt_pageinit(rightpage);
  547.  
  548.     /* init btree private data */
  549.     oopaque = (NOBTPageOpaque) PageGetSpecialPointer(origpage);
  550.     lopaque = (NOBTPageOpaque) PageGetSpecialPointer(leftpage);
  551.     ropaque = (NOBTPageOpaque) PageGetSpecialPointer(rightpage);
  552.  
  553.  
  554.     /* if we're splitting this page, it won't be the root when we're done */
  555.     lopaque->nobtpo_flags = ropaque->nobtpo_flags
  556.     = (oopaque->nobtpo_flags & ~NOBTP_ROOT);
  557.  
  558.     /*
  559.      *  In order to guarantee atomicity of updates, we need a sentinel value
  560.      *  in the peer pointers between the pages.  Note that we set these before
  561.      *  we establish any path to the new pages.  We can set the peer pointers
  562.      *  between the two new pages, since no one else will screw around with
  563.      *  those.  There's no path to the new pages yet, so we don't need to
  564.      *  bother locking the peer pointers before we update them.
  565.      */
  566.  
  567.     lopaque->nobtpo_prev = P_NONE;
  568.     ropaque->nobtpo_next = P_NONE;
  569.     ropaque->nobtpo_prev = BufferGetBlockNumber(buf);
  570.     lopaque->nobtpo_next = BufferGetBlockNumber(rbuf);
  571.  
  572.     /*
  573.      *  No need for a critical section here, since these pages can't be
  574.      *  written out by anybody.
  575.      */
  576.  
  577.     lopaque->nobtpo_nexttok = ropaque->nobtpo_prevtok = CurrentLinkToken;
  578.  
  579.     /*
  580.      *  Order of operations here is critical.  First, make sure that the
  581.      *  link pointer tokens for the 'replaced' link agree.  Then set the
  582.      *  'replaced' link for the original page.
  583.      *
  584.      *  The fake critical section calls block concurrent processes from
  585.      *  doing a 'commit' (and associated sync) while we're in a critical
  586.      *  section.
  587.      */
  588.  
  589. #define CRIT_SEC_START
  590. #define CRIT_SEC_END
  591.  
  592.     CRIT_SEC_START;
  593.     oopaque->nobtpo_repltok = lopaque->nobtpo_linktok = CurrentLinkToken;
  594.     CRIT_SEC_END;
  595.     oopaque->nobtpo_replaced = BufferGetBlockNumber(buf);
  596.  
  597.     /*
  598.      *  Now a path to the new pages exists (via the replaced link from the
  599.      *  old page).  Concurrent updates in the tree in adjacent pages will
  600.      *  follow this link if they want to update the peer pointers on the
  601.      *  (old) page.  For that reason, we issue a peer lock on the peer pointer
  602.      *  values before we update them.  If they've been updated already, then
  603.      *  we leave them as they are, since the update that changed them knew
  604.      *  what it was doing.
  605.      */
  606.  
  607.     if (lopaque->nobtpo_prev == P_NONE)
  608.     lopaque->nobtpo_prev = oopaque->nobtpo_prev;
  609.  
  610.     if (ropaque->nobtpo_next == P_NONE)
  611.     ropaque->nobtpo_next = oopaque->nobtpo_next;
  612.  
  613.     /*
  614.      *  If the page we're splitting is not the rightmost page at its level
  615.      *  in the tree, then the first (0) entry on the page is the high key
  616.      *  for the page.  We need to copy that to the right half.  Otherwise,
  617.      *  we should treat the line pointers beginning at zero as user data.
  618.      *
  619.      *  We leave a blank space at the start of the line table for the left
  620.      *  page.  We'll come back later and fill it in with the high key item
  621.      *  we get from the right key.
  622.      */
  623.  
  624.     nleft = 2;
  625.     nright = 1;
  626.     if (ropaque->nobtpo_next != P_NONE) {
  627.     start = 1;
  628.     itemid = PageGetItemId(origpage, 0);
  629.     itemsz = ItemIdGetLength(itemid);
  630.     iitem = (NOBTIItem) PageGetItem(origpage, itemid);
  631.     PageAddItem(rightpage, iitem, itemsz, nright++, LP_USED);
  632.     } else {
  633.     start = 0;
  634.     }
  635.  
  636.     maxoff = PageGetMaxOffsetIndex(origpage);
  637.     llimit = PageGetFreeSpace(leftpage) / 2;
  638.     firstright = _nobt_findsplitloc(rel, origpage, start, maxoff, llimit);
  639.  
  640.     for (i = start; i <= maxoff; i++) {
  641.     itemid = PageGetItemId(origpage, i);
  642.     itemsz = ItemIdGetLength(itemid);
  643.     iitem = (NOBTIItem) PageGetItem(origpage, itemid);
  644.  
  645.     /* decide which page to put it on */
  646.     if (i < firstright)
  647.         PageAddItem(leftpage, iitem, itemsz, nleft++, LP_USED);
  648.     else
  649.         PageAddItem(rightpage, iitem, itemsz, nright++, LP_USED);
  650.     }
  651.  
  652.     /*
  653.      *  Okay, page has been split, high key on right page is correct.  Now
  654.      *  set the high key on the left page to be the min key on the right
  655.      *  page.
  656.      */
  657.  
  658.     if (ropaque->nobtpo_next == P_NONE)
  659.     itemid = PageGetItemId(rightpage, 0);
  660.     else
  661.     itemid = PageGetItemId(rightpage, 1);
  662.     itemsz = ItemIdGetLength(itemid);
  663.     iitem = (NOBTIItem) PageGetItem(rightpage, itemid);
  664.  
  665.     /*
  666.      *  We left a hole for the high key on the left page; fill it.  The
  667.      *  modal crap is to tell the page manager to put the new item on the
  668.      *  page and not screw around with anything else.  Whoever designed
  669.      *  this interface has presumably crawled back into the dung heap they
  670.      *  came from.  No one here will admit to it.
  671.      */
  672.  
  673.     PageManagerModeSet(OverwritePageManagerMode);
  674.     PageAddItem(leftpage, iitem, itemsz, 1, LP_USED);
  675.     PageManagerModeSet(ShufflePageManagerMode);
  676.  
  677.     /*
  678.      *  By here, the original data page has been split into two new halves,
  679.      *  and these are correct.  The original buffer has had its 'replaced'
  680.      *  entry updated as appropriate.  Write these guys out.
  681.      */
  682.  
  683.     _nobt_wrtnorelbuf(rel, rbuf);
  684.     _nobt_wrtnorelbuf(rel, buf);
  685.  
  686.     /* can afford to drop the lock on the original and switch to the new */
  687.     _nobt_wrtbuf(rel, *bufP);
  688.     *bufP = buf;
  689.  
  690.     /*
  691.      *  Finally, we need to adjust the link pointers among siblings.  Since
  692.      *  we've added new pages, we have to adjust the sibling pointers on
  693.      *  either side of the new pages in the tree.
  694.      *
  695.      *  On-demand recovery requires that we detect and handle the case where
  696.      *  our (left or right) sibling in the tree was split, and not all pages
  697.      *  affected by the split made it to disk before a crash.  The major idea
  698.      *  is to defer repairing the page for as long as possible.  To that end,
  699.      *  what we do here is only update the peer pointer of the last page that
  700.      *  made it safely to disk.
  701.      *
  702.      *  In the case that the 'replaced' link is broken, we know that the peer
  703.      *  pointer on the source page was correctly updated, and that the 'old'
  704.      *  value for the peer pointer in the page opaque data must be preserved
  705.      *  until the page is fixed.
  706.      */
  707.  
  708.     nobackup = false;
  709.     if (ropaque->nobtpo_next != P_NONE) {
  710.     sbuf = _nobt_getbuf(rel, ropaque->nobtpo_next, NOBT_WRITE);
  711.     spage = BufferGetPage(sbuf, 0);
  712.     sopaque = (NOBTPageOpaque) PageGetSpecialPointer(spage);
  713.     while ((repl = sopaque->nobtpo_replaced) != P_NONE) {
  714.         newsbuf = _nobt_getbuf(rel, repl, NOBT_WRITE);
  715.         newspage = BufferGetPage(sbuf, 0);
  716.         newsopaque = (NOBTPageOpaque) PageGetSpecialPointer(newspage);
  717.  
  718.         /*
  719.          *  Determine whether this is a valid link.  If so, traverse
  720.          *  it.  Otherwise, we've gotten as far as we need to; bust
  721.          *  out.  Someone later will repair the damage we've just
  722.          *  detected.  This will happen when an insert or read of a
  723.          *  datum on the page with the broken link happens.
  724.          */
  725.  
  726.         if (sopaque->nobtpo_repltok != newsopaque->nobtpo_linktok) {
  727.  
  728.         /*
  729.          *  Link is broken.  Someone later will fix it.  For now,
  730.          *  we want to preserve the old peer pointer and link token,
  731.          *  since they may be needed to do the repair.  Store the
  732.          *  new peer pointer without backing up the old one.
  733.          *
  734.          *  The "break" is for the while (repl = ...) loop.
  735.          */
  736.  
  737.         nobackup = true;
  738.         break;
  739.         } else {
  740.  
  741.         /*
  742.          *  Traverse the replaced link and move the lock down.
  743.          */
  744.  
  745.         _nobt_relbuf(rel, sbuf, NOBT_WRITE);
  746.         sopaque = newsopaque;
  747.  
  748.         /* still have newsbuf locked with PREVLNK */
  749.         sbuf = newsbuf;
  750.         }
  751.     }
  752.  
  753.     /*
  754.      *  We need to hold the 'replaced' link lock until we have the
  755.      *  peer pointers correct.
  756.      */
  757.  
  758.     if (!nobackup)
  759.         sopaque->nobtpo_oldprev = sopaque->nobtpo_prev;
  760.  
  761.     sopaque->nobtpo_prev = BufferGetBlockNumber(rbuf);
  762.     CRIT_SEC_START;
  763.     ropaque->nobtpo_nexttok = sopaque->nobtpo_prevtok = CurrentLinkToken;
  764.     CRIT_SEC_END;
  765.  
  766.     /* write and release the old right sibling */
  767.     _nobt_wrtbuf(rel, sbuf);
  768.     }
  769.  
  770.     nobackup = false;
  771.     if (lopaque->nobtpo_prev != P_NONE) {
  772.     sbuf = _nobt_getbuf(rel, ropaque->nobtpo_next, NOBT_WRITE);
  773.     spage = BufferGetPage(sbuf, 0);
  774.     sopaque = (NOBTPageOpaque) PageGetSpecialPointer(spage);
  775.     while ((repl = sopaque->nobtpo_replaced) != P_NONE) {
  776.         newsbuf = _nobt_getbuf(rel, repl, NOBT_WRITE);
  777.         newspage = BufferGetPage(sbuf, 0);
  778.         newsopaque = (NOBTPageOpaque) PageGetSpecialPointer(newspage);
  779.  
  780.         /*
  781.          *  Determine whether this is a valid link.  If so, traverse
  782.          *  it.  Otherwise, we've gotten as far as we need to.
  783.          */
  784.  
  785.         if (sopaque->nobtpo_repltok != newsopaque->nobtpo_linktok) {
  786.  
  787.         /*
  788.          *  Link is broken.  Someone later will fix it.  For now,
  789.          *  we want to preserve the old peer pointer and link token,
  790.          *  since they may be needed to do the repair.  Store the
  791.          *  new peer pointer without backing up the old one.
  792.          *
  793.          *  The "break" is for the while (repl = ...) loop.
  794.          */
  795.  
  796.         nobackup = true;
  797.         break;
  798.         } else {
  799.  
  800.         /*
  801.          *  Traverse the replaced link and move the lock down.
  802.          */
  803.  
  804.         _nobt_relbuf(rel, sbuf, NOBT_WRITE);
  805.         sopaque = newsopaque;
  806.         sbuf = newsbuf;
  807.         }
  808.     }
  809.  
  810.     /*
  811.      *  By here, we have sopaque->nobtpo_next locked.  Need to hold the
  812.      *  'replaced' lock (which we also hold) unitl the peer pointers are
  813.      *  correct.
  814.      */
  815.  
  816.     if (!nobackup)
  817.         sopaque->nobtpo_oldnext = sopaque->nobtpo_next;
  818.  
  819.     sopaque->nobtpo_next = BufferGetBlockNumber(buf);
  820.     CRIT_SEC_START;
  821.     lopaque->nobtpo_prevtok = sopaque->nobtpo_nexttok = CurrentLinkToken;
  822.     CRIT_SEC_END;
  823.  
  824.     /* write and release the old right sibling */
  825.     _nobt_wrtbuf(rel, sbuf);
  826.     }
  827.  
  828.     /* split's done */
  829.     return (rbuf);
  830. }
  831. #endif    /* SHADOW */
  832.  
  833. /*
  834.  *  _nobt_findsplitloc() -- find a safe place to split a page.
  835.  *
  836.  *    In order to guarantee the proper handling of searches for duplicate
  837.  *    keys, the first duplicate in the chain must either be the first
  838.  *    item on the page after the split, or the entire chain must be on
  839.  *    one of the two pages.  That is,
  840.  *        [1 2 2 2 3 4 5]
  841.  *    must become
  842.  *        [1] [2 2 2 3 4 5]
  843.  *    or
  844.  *        [1 2 2 2] [3 4 5]
  845.  *    but not
  846.  *        [1 2 2] [2 3 4 5].
  847.  *    However,
  848.  *        [2 2 2 2 2 3 4]
  849.  *    may be split as
  850.  *        [2 2 2 2] [2 3 4].
  851.  *
  852.  *  For the no-overwrite implementation, we assume no duplicates.
  853.  */
  854.  
  855. OffsetIndex
  856. _nobt_findsplitloc(rel, page, start, maxoff, llimit)
  857.     Relation rel;
  858.     Page page;
  859.     OffsetIndex start;
  860.     OffsetIndex maxoff;
  861.     Size llimit;
  862. {
  863.     OffsetIndex maxoff;
  864.     OffsetIndex splitloc;
  865.  
  866.     maxoff = PageGetMaxOffsetIndex(page);
  867.     splitloc = maxoff / 2;
  868.  
  869.     return (splitloc);
  870. }
  871.  
  872. /*
  873.  *  _nobt_newroot() -- Create a new root page for the index.
  874.  *
  875.  *    We've just split the old root page and need to create a new one.
  876.  *    In order to do this, we add a new root page to the file, then lock
  877.  *    the metadata page and update it.  This is guaranteed to be deadlock-
  878.  *    free, because all readers release their locks on the metadata page
  879.  *    before trying to lock the root, and all writers lock the root before
  880.  *    trying to lock the metadata page.  We have a write lock on the old
  881.  *    root page, so we have not introduced any cycles into the waits-for
  882.  *    graph.
  883.  *
  884.  *    On entry, lbuf (the old root) and rbuf (its new peer) are write-
  885.  *    locked.  We don't drop the locks in this routine; that's done by
  886.  *    the caller.  On exit, a new root page exists with entries for the
  887.  *    two new children.  The new root page is neither pinned nor locked.
  888.  */
  889.  
  890. _nobt_newroot(rel, lbuf, rbuf)
  891.     Relation rel;
  892.     Buffer lbuf;
  893.     Buffer rbuf;
  894. {
  895.     Buffer rootbuf;
  896.     BlockNumber lbkno, rbkno;
  897.     Page lpage, rpage, rootpage;
  898.     NOBTPageOpaque lopaque, ropaque;
  899.     BlockNumber rootbknum;
  900.     NOBTPageOpaque rootopaque;
  901.     ItemId itemid;
  902.     NOBTIItem iitem;
  903.     NOBTLItem litem;
  904.     int itemsz;
  905.     NOBTIItem new_item;
  906.     Size dsize;
  907.     char *datump;
  908.  
  909.     /* get a new root page */
  910.     rootbuf = _nobt_getbuf(rel, P_NEW, NOBT_WRITE);
  911.     rootpage = BufferGetPage(rootbuf, 0);
  912.     _nobt_pageinit(rootpage);
  913.  
  914.     /* set btree special data */
  915.     rootopaque = (NOBTPageOpaque) PageGetSpecialPointer(rootpage);
  916.     rootopaque->nobtpo_prev = rootopaque->nobtpo_next = P_NONE;
  917.     rootopaque->nobtpo_flags |= NOBTP_ROOT;
  918.  
  919.     /*
  920.      *  Insert the internal tuple pointers.
  921.      */
  922.  
  923.     lbkno = BufferGetBlockNumber(lbuf);
  924.     rbkno = BufferGetBlockNumber(rbuf);
  925.     lpage = BufferGetPage(lbuf, 0);
  926.     rpage = BufferGetPage(rbuf, 0);
  927.     lopaque = (NOBTPageOpaque) PageGetSpecialPointer(lpage);
  928.     ropaque = (NOBTPageOpaque) PageGetSpecialPointer(rpage);
  929.  
  930.     /* step over the high key on the left page */
  931.     itemid = PageGetItemId(lpage, 1);
  932.     itemsz = ItemIdGetLength(itemid);
  933.     if (lopaque->nobtpo_flags & NOBTP_LEAF) {
  934.         litem = (NOBTLItem) PageGetItem(lpage, itemid);
  935.     datump = (char *) litem;
  936.     datump += sizeof(NOBTLItemData);
  937.     dsize = itemsz - sizeof(IndexTupleData);
  938.     } else {
  939.         iitem = (NOBTIItem) PageGetItem(lpage, itemid);
  940.     datump = (char *) iitem;
  941.     datump += sizeof(NOBTIItemData);
  942.     dsize = itemsz - sizeof(NOBTIItemData);
  943.     }
  944.     new_item = _nobt_formiitem(lbkno, datump, dsize);
  945.  
  946.     /* insert the left page pointer */
  947.     PageAddItem(rootpage, new_item, NOBTIItemGetSize(new_item), 1, LP_USED);
  948.     pfree(new_item);
  949.  
  950.     itemid = PageGetItemId(rpage, 0);
  951.     itemsz = ItemIdGetLength(itemid);
  952.     if (ropaque->nobtpo_flags & NOBTP_LEAF) {
  953.         litem = (NOBTLItem) PageGetItem(rpage, itemid);
  954.     datump = (char *) litem;
  955.     datump += sizeof(NOBTLItemData);
  956.     dsize = itemsz - sizeof(IndexTupleData);
  957.     } else {
  958.         iitem = (NOBTIItem) PageGetItem(rpage, itemid);
  959.     datump = (char *) iitem;
  960.     datump += sizeof(NOBTIItemData);
  961.     dsize = itemsz - sizeof(NOBTIItemData);
  962.     }
  963.     new_item = _nobt_formiitem(rbkno, datump, dsize);
  964.  
  965.     /* insert the right page pointer */
  966.     PageAddItem(rootpage, new_item, NOBTIItemGetSize(new_item), 2, LP_USED);
  967.     pfree (new_item);
  968.  
  969.     /* write and let go of the root buffer */
  970.     rootbknum = BufferGetBlockNumber(rootbuf);
  971.     _nobt_wrtbuf(rel, rootbuf);
  972.  
  973.     /* update metadata page with new root block number */
  974.     _nobt_metaproot(rel, rootbknum);
  975. }
  976.  
  977. /*
  978.  *  _nobt_pgaddtup() -- add a tuple to a particular page in the index.
  979.  *
  980.  *    This routine adds the tuple to the page as requested, and keeps the
  981.  *    write lock and reference associated with the page's buffer.  It is
  982.  *    an error to call pgaddtup() without a write lock and reference.  If
  983.  *    afteritem is non-null, it's the item that we expect our new item
  984.  *    to follow.  Otherwise, we do a binary search for the correct place
  985.  *    and insert the new item there.
  986.  */
  987.  
  988. OffsetIndex
  989. _nobt_pgaddtup(rel, buf, keysz, itup_scankey, itemsize, btvoid, voidafter)
  990.     Relation rel;
  991.     Buffer buf;
  992.     int keysz;
  993.     ScanKey itup_scankey;
  994.     int itemsize;
  995.     char *btvoid;
  996.     char *voidafter;
  997. {
  998.     Page page;
  999.     OffsetIndex itup_off;
  1000.  
  1001.     page = BufferGetPage(buf, 0);
  1002.     itup_off = _nobt_binsrch(rel, buf, keysz, itup_scankey, NOBT_INSERTION);
  1003.     PageAddItem(page, btvoid, itemsize, itup_off + 1, LP_USED);
  1004.  
  1005.     /* write the buffer, but hold our lock */
  1006.     _nobt_wrtnorelbuf(rel, buf);
  1007.  
  1008.     return (itup_off);
  1009. }
  1010.  
  1011. /*
  1012.  *  _nobt_goesonpg() -- Does a new tuple belong on this page?
  1013.  *
  1014.  *    This is part of the complexity introduced by allowing duplicate
  1015.  *    keys into the index.  The tuple belongs on this page if:
  1016.  *
  1017.  *        + there is no page to the right of this one; or
  1018.  *        + it is less than the high key on the page; or
  1019.  *        + the item it is to follow ("afteritem") appears on this
  1020.  *          page.
  1021.  */
  1022.  
  1023. bool
  1024. _nobt_goesonpg(rel, buf, keysz, scankey, afteritem)
  1025.     Relation rel;
  1026.     Buffer buf;
  1027.     Size keysz;
  1028.     ScanKey scankey;
  1029.     NOBTIItem afteritem;
  1030. {
  1031.     Page page;
  1032.     NOBTPageOpaque opaque;
  1033.     ItemId hikey;
  1034.  
  1035.     page = BufferGetPage(buf, 0);
  1036.  
  1037.     /* no right neighbor? */
  1038.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  1039.     if (opaque->nobtpo_next == P_NONE)
  1040.     return (true);
  1041.  
  1042.     /* < the high key? */
  1043.     hikey = PageGetItemId(page, 0);
  1044.     if (_nobt_skeycmp(rel, keysz, scankey, page, hikey, NOBTLessStrategyNumber))
  1045.     return (true);
  1046.  
  1047.     /*
  1048.      *  If the scan key is > the high key, then it for sure doesn't belong
  1049.      *  here.
  1050.      */
  1051.  
  1052.     return (false);
  1053. }
  1054. #endif /* NOBTREE */
  1055.